package cc.iotkit.plugins.tcp.cilent;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;

import java.time.Duration;

/**
 * @author huangwenl
 * @date 2022-10-13
 */
@Slf4j
public class VertxTcpClient {
    @Getter
    private String id;//客户端标识
    public NetSocket socket;//用于网络通信的套接字
    @Setter
    private long keepAliveTimeoutMs = Duration.ofSeconds(30).toMillis();//保持连接活跃的超时时间（毫秒）
    private volatile long lastKeepAliveTime = System.currentTimeMillis();//上次收到保持活跃信号的时间戳。

    @Setter
    private RecordParser parser;//用于解析接收到的数据的解析器。

    public VertxTcpClient(String id) {
        this.id = id;
    }

    //更新 lastKeepAliveTime 为当前时间
    public void keepAlive() {
        lastKeepAliveTime = System.currentTimeMillis();
    }

    /**
     * 检查客户端是否在线，基于 lastKeepAliveTime 和 keepAliveTimeoutMs。
     * @return
     */
    public boolean isOnline() {
        return System.currentTimeMillis() - lastKeepAliveTime < keepAliveTimeoutMs;
    }

    /**
     * 设置网络通信的套接字，并定义当连接关闭、接收到数据时的行为。
     * @param socket
     */
    public void setSocket(NetSocket socket) {
        synchronized (this) {
            if (this.socket != null && this.socket != socket) {
                this.socket.close();
            }

            this.socket = socket
                    .closeHandler(v -> shutdown())
                    .handler(buffer -> {
                        if (log.isDebugEnabled()) {
                            log.debug("handle tcp client[{}] payload:[{}]",
                                    socket.remoteAddress(),
                                    Hex.encodeHexString(buffer.getBytes()));
                        }
                        keepAlive();
                        parser.handle(buffer);
                        if (this.socket != socket) {
                            log.warn("tcp client [{}] memory leak ", socket.remoteAddress());
                            socket.close();
                        }
                    });
        }
    }

    /**
     * 关闭连接，如果 socket 不为空，尝试关闭它。
     */
    public void shutdown() {
        log.debug("tcp client [{}] disconnect", getId());
        synchronized (this) {
            if (null != socket) {
                execute(socket::close);
                this.socket = null;
            }
        }
    }

    /**
     * 向服务器发送消息，并记录消息发送成功或失败的日志。
     * @param buffer
     */
    public void sendMessage(Buffer buffer) {
        log.info("wirte data:{}", buffer.toString());
        socket.write(buffer, r -> {
            keepAlive();
            if (r.succeeded()) {
                log.info("client msg send success:{}", buffer.toString());
            } else {
                log.error("client msg send failed", r.cause());
            }
        });
    }

    /**
     * 安全地执行一个 Runnable 任务，捕获并记录任何异常。
     * @param runnable
     */
    private void execute(Runnable runnable) {
        try {
            runnable.run();
        } catch (Exception e) {
            log.warn("close tcp client error", e);
        }
    }

}
